package com.apobates.jforum.grief.schema2doc.reactive.dialect;

import com.apobates.jforum.grief.schema2doc.core.SchemaTableStrategy;
import com.apobates.jforum.grief.schema2doc.core.entity.Column;
import com.apobates.jforum.grief.schema2doc.core.entity.Nullable;
import com.apobates.jforum.grief.schema2doc.core.entity.PrimayKey;
import com.apobates.jforum.grief.schema2doc.core.entity.Table;
import com.apobates.jforum.grief.schema2doc.core.schema.SchemaDialect;
import com.apobates.jforum.grief.schema2doc.reactive.SchemaQueryFlowableResult;
import com.apobates.jforum.grief.schema2doc.reactive.schema.AbstractRxJavaQueryExecutor;
import io.reactivex.Flowable;
import io.reactivex.Single;
import org.davidmoten.rx.jdbc.Database;
import org.jooq.DSLContext;
import org.jooq.impl.DSL;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;

/**
 * RxJava2的PostgreSQL的结果集供应商
 * https://github.com/davidmoten/rxjava-jdbc
 */
public class RxJavaPostgreSQLExecutor extends AbstractRxJavaQueryExecutor {

    private final static Logger logger = LoggerFactory.getLogger(RxJavaPostgreSQLExecutor.class);

    public RxJavaPostgreSQLExecutor(Database db, SchemaDialect dialect) {
        super(db, dialect);
    }

    @Override
    protected Flowable<Table> getTables(Database db, Optional<String> dbName, Optional<String> schema, SchemaTableStrategy tableStrategy) throws IllegalArgumentException {
        String sql = "SELECT t.tablename, obj_description(c.oid) AS description, c.oid FROM pg_tables t JOIN pg_class c ON t.tablename=c.relname WHERE t.schemaname=:schema";
        if(tableStrategy.sql(SchemaDialect.PostgreSQL).isPresent()){
            sql+= " AND " + tableStrategy.sql(SchemaDialect.PostgreSQL).get().replaceAll(SchemaTableStrategy.placeholder, "tablename");
        }
        logger.debug("[RxJava-PostgreSQL-Table]Query Table, Db::"+dbName);
        return db
                .select(sql)
                .parameter("schema", schema.get())
                .get(rs-> Table.noSchema(0, dbName.get(),
                        rs.getString("tableName"),
                        rs.getString("description")))
                .filter(table->tableStrategy.filter().test(table.getTableName()));
    }

    @Override
    protected Single<List<Column>> getColumns(Database db, Table table) {
        String sql = "SELECT " +
                "tmp.*, isc.numeric_scale, case when coalesce(character_maximum_length, numeric_precision, -1) = -1 then null else coalesce(character_maximum_length, numeric_precision, -1) end as column_size " +
                "FROM information_schema.columns isc " +
                "LEFT JOIN (" +
                "   SELECT a.attname as column_name,concat_ws('', t.typname, SUBSTRING(format_type(a.atttypid, a.atttypmod) from '\\(.*\\)')) as udt_name, d.description,a.attnotnull as is_nullable,def.adsrc as column_default " +
                "   FROM pg_attribute a " +
                "   LEFT JOIN pg_description d ON d.objoid = a.attrelid and d.objsubid = a.attnum " +
                "   LEFT JOIN pg_class c ON a.attrelid = c.oid " +
                "   LEFT JOIN pg_type t ON a.atttypid = t.oid " +
                "   LEFT JOIN pg_attrdef def ON a.attrelid = def.adrelid AND a.attnum = def.adnum WHERE a.attnum >= 0 and c.relname = :tablename) tmp on tmp.column_name = isc.column_name " +
                "WHERE isc.table_name = :tablename";
        logger.debug("[RxJava-PostgreSQL-Column]Query Column::"+table);
        final List<String> pks = getPrimayKey(db, table.getTableName());
        // Fetch results using jOOQ
        return db.select(sql)
                .parameter("tablename", table.getTableName())
                .get(rs-> {
                    String columnName = rs.getString("column_name");
                    return Column.basic(
                                    columnName,
                                    rs.getString("udt_name"),
                                    rs.getString("column_size"),
                                    Nullable.getInstance(rs.getString("is_nullable")),
                                    rs.getString("description"))
                            .toFull(
                                    rs.getString("numeric_scale"),
                                    pks.contains(columnName)?PrimayKey.YES:PrimayKey.NO,
                                    rs.getString("columnDef"));
                })
                .toList();
    }

    private List<String> getPrimayKey(Database db, String tableName){
        String sql = "SELECT result.COLUMN_NAME, result.KEY_SEQ, result.PK_NAME FROM(SELECT NULL AS TABLE_CAT, n.nspname AS TABLE_SCHEM, ct.relname AS TABLE_NAME, a.attname AS COLUMN_NAME, (information_schema._pg_expandarray(i.indkey)).n AS KEY_SEQ, ci.relname AS PK_NAME, information_schema._pg_expandarray(i.indkey) AS KEYS, a.attnum AS A_ATTNUM FROM pg_catalog.pg_class ct JOIN pg_catalog.pg_attribute a ON (ct.oid = a.attrelid) JOIN pg_catalog.pg_namespace n ON (ct.relnamespace = n.oid) JOIN pg_catalog.pg_index i ON (a.attrelid = i.indrelid) JOIN pg_catalog.pg_class ci ON (ci.oid = i.indexrelid) WHERE i.indisprimary) result WHERE result.A_ATTNUM = (result.KEYS).x AND result.TABLE_NAME = :TN ";
        return db
                .select(sql)
                .parameter("TN", tableName)
                .get(record -> {
            return record.getString("column_name");
        }).toList().blockingGet();
    }
}
